package cKafka

import (
	"time"

	"gitee.com/csingo/cLog"
)

func load() {
	container.healthcheck = kafka_config.HealthCheck
	go func() {
		for {
			if container.healthcheck <= 0 {
				time.Sleep(10 * time.Second)
				continue
			}
			clientIds := container.GetAllClientIDs()
			for _, clientId := range clientIds {
				client := container.getClient(clientId)
				operatorIds := container.getOperatorIDs(clientId)
				if client.client.Closed() {
					cLog.WithContext(nil, map[string]interface{}{
						"source":    "cKafka.load",
						"client":    clientId,
						"operators": operatorIds,
					}).Error("cKafka 健康检查 fail，客户端连接已断开")
				} else {
					cLog.WithContext(nil, map[string]interface{}{
						"source":    "cKafka.load",
						"client":    clientId,
						"operators": operatorIds,
					}).Trace("cKafka 健康检查 success")
				}
			}

			time.Sleep(time.Duration(container.healthcheck) * time.Second)
		}
	}()
}
